/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/** Base class for tasks. */
abstract class Task implements Writable, Configurable {
	private static final Log LOG = LogFactory
			.getLog("org.apache.hadoop.mapred.TaskRunner");

	// Counters used by Task subclasses
	protected static enum Counter {
		MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_SKIPPED_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_SHUFFLE_BYTES, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS, REDUCE_SKIPPED_GROUPS, REDUCE_SKIPPED_RECORDS, SPILLED_RECORDS
	}

	/**
	 * Counters to measure the usage of the different file systems. Always
	 * return the String array with two elements. First one is the name of
	 * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
	 */
	protected static String[] getFileSystemCounterNames(String uriScheme) {
		String scheme = uriScheme.toUpperCase();
		return new String[] { scheme + "_BYTES_READ", scheme + "_BYTES_WRITTEN" };
	}

	/**
	 * HaLoop: recovery from task tracker
	 */
	private String recoverFromTaskTracker = "XXX";

	/**
	 * HaLoop: task failure bit
	 */
	private boolean taskFailure = false;

	/**
	 * HaLoop: node failure bit
	 */
	private boolean nodeFailure = false;

	/**
	 * set recover from task tracker
	 * 
	 * @param tts
	 */
	public void setRecoverFromTaskTracker(String tts) {
		recoverFromTaskTracker = tts;
	}

	/**
	 * set the task failure bit
	 * 
	 * @param f
	 */
	public void setTaskFailure(boolean f) {
		taskFailure = f;
	}

	/**
	 * set the node failure bit
	 * 
	 * @param f
	 */
	public void setNodeFailure(boolean f) {
		nodeFailure = f;
	}

	/**
	 * get task failure
	 * @return taskFailure
	 */
	public boolean getTaskFailure() {
		return taskFailure;
	}
	
	/**
	 * get node failure
	 * @return nodeFailure
	 */
	public boolean getNodeFailure() {
		return nodeFailure;
	}
	

	/**
	 * get recover from task tracker
	 * 
	 * @return
	 */
	public String getRecoverFromTaskTracker() {
		return recoverFromTaskTracker;
	}

	/**
	 * Name of the FileSystem counters' group
	 */
	protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";

	// /////////////////////////////////////////////////////////
	// Helper methods to construct task-output paths
	// /////////////////////////////////////////////////////////

	/**
	 * Construct output file names so that, when an output directory listing is
	 * sorted lexicographically, positions correspond to output partitions.
	 */
	private static final NumberFormat NUMBER_FORMAT = NumberFormat
			.getInstance();
	static {
		NUMBER_FORMAT.setMinimumIntegerDigits(5);
		NUMBER_FORMAT.setGroupingUsed(false);
	}

	static synchronized String getOutputName(int partition) {
		return "part-" + NUMBER_FORMAT.format(partition);
	}

	static synchronized String getOutputName(int partition, int iteration) {
		return "part-" + NUMBER_FORMAT.format(partition);
	}

	// //////////////////////////////////////////
	// Fields
	// //////////////////////////////////////////

	private String jobFile; // job configuration file
	private TaskAttemptID taskId; // unique, includes job id
	private int partition; // id within job
	TaskStatus taskStatus; // current status of the task
	protected boolean jobCleanup = false;
	protected boolean jobSetup = false;
	protected boolean taskCleanup = false;

	// skip ranges based on failed ranges from previous attempts
	private SortedRanges skipRanges = new SortedRanges();
	private boolean skipping = false;
	private boolean writeSkipRecs = true;

	// currently processing record start index
	private volatile long currentRecStartIndex;
	private Iterator<Long> currentRecIndexIterator = skipRanges
			.skipRangeIterator();

	protected JobConf conf;
	protected MapOutputFile mapOutputFile = new MapOutputFile();
	protected ReduceOutputFile_leon reduceOutputFile=new ReduceOutputFile_leon();
	protected LocalDirAllocator lDirAlloc;
	private final static int MAX_RETRIES = 10;
	protected JobContext jobContext;
	protected TaskAttemptContext taskContext;
	protected org.apache.hadoop.mapreduce.OutputFormat<?, ?> outputFormat;
	protected org.apache.hadoop.mapreduce.OutputCommitter committer;
	protected final Counters.Counter spilledRecordsCounter;
	private String pidFile = "";
	protected TaskUmbilicalProtocol umbilical;

	// //////////////////////////////////////////
	// Constructors
	// //////////////////////////////////////////

	public Task() {
		taskStatus = TaskStatus.createTaskStatus(isMapTask());
		taskId = new TaskAttemptID();
		spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
		try {
			java.net.InetAddress localMachine = java.net.InetAddress
					.getLocalHost();
			System.out.println(localMachine.getHostName());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public Task(String jobFile, TaskAttemptID taskId, int partition) {

		try {
			java.net.InetAddress localMachine = java.net.InetAddress
					.getLocalHost();
			System.out.println(localMachine.getHostName());
		} catch (Exception e) {
			e.printStackTrace();
		}
		this.jobFile = jobFile;
		this.taskId = taskId;

		this.partition = partition;
		this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
				0.0f, TaskStatus.State.UNASSIGNED, "", "", "",
				isMapTask() ? TaskStatus.Phase.MAP : TaskStatus.Phase.SHUFFLE,
				counters);
		this.mapOutputFile.setJobId(taskId.getJobID());
		spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
	}

	// //////////////////////////////////////////
	// Accessors
	// //////////////////////////////////////////
	public void setJobFile(String jobFile) {
		this.jobFile = jobFile;
	}

	public String getJobFile() {
		return jobFile;
	}

	public TaskAttemptID getTaskID() {
		return taskId;
	}

	Counters getCounters() {
		return counters;
	}

	public void setPidFile(String pidFile) {
		this.pidFile = pidFile;
	}

	public String getPidFile() {
		return pidFile;
	}

	/**
	 * Get the job name for this task.
	 * 
	 * @return the job name
	 */
	public JobID getJobID() {
		return taskId.getJobID();
	}

	/**
	 * Get the index of this task within the job.
	 * 
	 * @return the integer part of the task id
	 */
	public int getPartition() {
		return partition;
	}

	/**
	 * Return current phase of the task. needs to be synchronized as
	 * communication thread sends the phase every second
	 * 
	 * @return
	 */
	public synchronized TaskStatus.Phase getPhase() {
		return this.taskStatus.getPhase();
	}

	/**
	 * Set current phase of the task.
	 * 
	 * @param p
	 */
	protected synchronized void setPhase(TaskStatus.Phase phase) {
		this.taskStatus.setPhase(phase);
	}

	/**
	 * Get whether to write skip records.
	 */
	protected boolean toWriteSkipRecs() {
		return writeSkipRecs;
	}

	/**
	 * Set whether to write skip records.
	 */
	protected void setWriteSkipRecs(boolean writeSkipRecs) {
		this.writeSkipRecs = writeSkipRecs;
	}

	/**
	 * Report a fatal error to the parent (task) tracker.
	 */
	protected void reportFatalError(TaskAttemptID id, Throwable throwable,
			String logMsg) {
		LOG.fatal(logMsg);
		Throwable tCause = throwable.getCause();
		String cause = tCause == null ? StringUtils
				.stringifyException(throwable) : StringUtils
				.stringifyException(tCause);
		try {
			umbilical.fatalError(id, cause);
		} catch (IOException ioe) {
			LOG.fatal("Failed to contact the tasktracker", ioe);
			System.exit(-1);
		}
	}

	/**
	 * Get skipRanges.
	 */
	public SortedRanges getSkipRanges() {
		return skipRanges;
	}

	/**
	 * Set skipRanges.
	 */
	public void setSkipRanges(SortedRanges skipRanges) {
		this.skipRanges = skipRanges;
	}

	/**
	 * Is Task in skipping mode.
	 */
	public boolean isSkipping() {
		return skipping;
	}

	/**
	 * Sets whether to run Task in skipping mode.
	 * 
	 * @param skipping
	 */
	public void setSkipping(boolean skipping) {
		this.skipping = skipping;
	}

	/**
	 * Return current state of the task. needs to be synchronized as
	 * communication thread sends the state every second
	 * 
	 * @return
	 */
	synchronized TaskStatus.State getState() {
		return this.taskStatus.getRunState();
	}

	/**
	 * Set current state of the task.
	 * 
	 * @param state
	 */
	synchronized void setState(TaskStatus.State state) {
		this.taskStatus.setRunState(state);
	}

	void setTaskCleanupTask() {
		taskCleanup = true;
	}

	boolean isTaskCleanupTask() {
		return taskCleanup;
	}

	boolean isJobCleanupTask() {
		return jobCleanup;
	}

	boolean isJobSetupTask() {
		return jobSetup;
	}

	void setJobSetupTask() {
		jobSetup = true;
	}

	void setJobCleanupTask() {
		jobCleanup = true;
	}

	boolean isMapOrReduce() {
		return !jobSetup && !jobCleanup && !taskCleanup;
	}

	/**
	 * haloop: current iteration
	 */
	protected int iteration = 0;

	/**
	 * haloop: current step in loop body
	 */
	protected int step = 0;

	/**
	 * haloop: current round iteration*numOfLoopBodySteps + step
	 */
	protected int round = 0;

	/**
	 * haloop: num. of loop body steps
	 */
	protected int numOfLoopBodySteps = 0;

	/**
	 * haloop: set the current running iteration
	 * 
	 * @param i
	 *            , iteration
	 */
	public void setCurrentIteration(int i) {
		iteration = i;
		setRound();
	}

	/**
	 * haloop: get the current running iteration
	 * 
	 */
	public int getCurrentIteration() {
		return iteration;
	}

	/**
	 * haloop: set the current running iteration
	 * 
	 * @param i
	 *            , iteration
	 */
	public void setCurrentStep(int i) {
		step = i;
		setRound();
	}

	/**
	 * haloop: get the current running iteration
	 * 
	 * @return, the current iteration
	 */
	public int getCurrentStep() {
		return step;
	}

	/**
	 * haloop: set current m-r round
	 */
	protected void setRound() {
		round = iteration * numOfLoopBodySteps + step;
	}

	/**
	 * haloop: return current m-r round
	 * 
	 * @return current m-r round
	 */
	public int getRound() {
		return round;
	}

	// //////////////////////////////////////////
	// Writable methods
	// //////////////////////////////////////////

	public void write(DataOutput out) throws IOException {
		// Yingyi:
		out.writeInt(iteration);
		out.writeInt(step);
		out.writeInt(numOfLoopBodySteps);
		out.writeInt(round);
		Text.writeString(out, recoverFromTaskTracker);
		out.writeBoolean(this.taskFailure);
		out.writeBoolean(this.nodeFailure);

		Text.writeString(out, jobFile);
		taskId.write(out);
		out.writeInt(partition);
		taskStatus.write(out);
		skipRanges.write(out);
		out.writeBoolean(skipping);
		out.writeBoolean(jobCleanup);
		out.writeBoolean(jobSetup);
		out.writeBoolean(writeSkipRecs);
		out.writeBoolean(taskCleanup);
		Text.writeString(out, pidFile);
	}

	public void readFields(DataInput in) throws IOException {
		// Yingyi:
		iteration = in.readInt();
		step = in.readInt();
		numOfLoopBodySteps = in.readInt();
		round = in.readInt();
		recoverFromTaskTracker = Text.readString(in);
		taskFailure = in.readBoolean();
		nodeFailure = in.readBoolean();

		jobFile = Text.readString(in);
		taskId = TaskAttemptID.read(in);
		partition = in.readInt();
		taskStatus.readFields(in);
		this.mapOutputFile.setJobId(taskId.getJobID());
		skipRanges.readFields(in);
		currentRecIndexIterator = skipRanges.skipRangeIterator();
		currentRecStartIndex = currentRecIndexIterator.next();
		skipping = in.readBoolean();
		jobCleanup = in.readBoolean();
		jobSetup = in.readBoolean();
		writeSkipRecs = in.readBoolean();
		taskCleanup = in.readBoolean();
		if (taskCleanup) {
			setPhase(TaskStatus.Phase.CLEANUP);
		}
		pidFile = Text.readString(in);
	}

	@Override
	public String toString() {
		return taskId.toString();
	}

	/**
	 * Localize the given JobConf to be specific for this task.
	 */
	public void localizeConfiguration(JobConf conf) throws IOException {
		conf.set("mapred.tip.id", taskId.getTaskID().toString());
		conf.set("mapred.task.id", taskId.toString());
		conf.setBoolean("mapred.task.is.map", isMapTask());
		conf.setInt("mapred.task.partition", partition);
		conf.set("mapred.job.id", taskId.getJobID().toString());
	}

	/**
	 * Run this task as a part of the named job. This method is executed in the
	 * child process and is what invokes user-supplied map, reduce, etc.
	 * methods.
	 * 
	 * @param umbilical
	 *            for progress reports
	 */
	public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
			throws IOException, ClassNotFoundException, InterruptedException;

	/**
	 * Return an approprate thread runner for this task.
	 * 
	 * @param tip
	 *            TODO
	 */
	public abstract TaskRunner createRunner(TaskTracker tracker,
			TaskTracker.TaskInProgress tip) throws IOException;

	/** The number of milliseconds between progress reports. */
	public static final int PROGRESS_INTERVAL = 3000;

	private transient Progress taskProgress = new Progress();

	// Current counters
	private transient Counters counters = new Counters();

	/* flag to track whether task is done */
	private AtomicBoolean taskDone = new AtomicBoolean(false);

	public abstract boolean isMapTask();

	public Progress getProgress() {
		return taskProgress;
	}
	
	public int getNumberOfInputs() {
		  return 0;
	  }

	public void initialize(JobConf job, JobID id, Reporter reporter,
			boolean useNewApi) throws IOException, ClassNotFoundException,
			InterruptedException {
		jobContext = new JobContext(job, id, reporter);
		taskContext = new TaskAttemptContext(job, taskId, reporter);
		if (getState() == TaskStatus.State.UNASSIGNED) {
			setState(TaskStatus.State.RUNNING);
		}
		if (useNewApi) {
			LOG.debug("using new api for output committer");
			outputFormat = ReflectionUtils.newInstance(taskContext
					.getOutputFormatClass(), job);
			committer = outputFormat.getOutputCommitter(taskContext);
		} else {
			committer = conf.getOutputCommitter();
		}
		Path outputPath = FileOutputFormat.getOutputPath(conf);// ,
		// this.iteration);
		if (outputPath != null) {
			if ((committer instanceof FileOutputCommitter)) {
				FileOutputFormat.setWorkOutputPath(conf,
						((FileOutputCommitter) committer)
								.getTempTaskOutputPath(taskContext));
			} else {
				FileOutputFormat.setWorkOutputPath(conf, outputPath);
			}
		}
		committer.setupTask(taskContext);
	}

	protected class TaskReporter extends
			org.apache.hadoop.mapreduce.StatusReporter implements Runnable,
			Reporter {
		private TaskUmbilicalProtocol umbilical;
		private InputSplit split = null;
		private Progress taskProgress;
		private Thread pingThread = null;
		/**
		 * flag that indicates whether progress update needs to be sent to
		 * parent. If true, it has been set. If false, it has been reset. Using
		 * AtomicBoolean since we need an atomic read & reset method.
		 */
		private AtomicBoolean progressFlag = new AtomicBoolean(false);

		TaskReporter(Progress taskProgress, TaskUmbilicalProtocol umbilical) {
			this.umbilical = umbilical;
			this.taskProgress = taskProgress;
		}

		// getters and setters for flag
		void setProgressFlag() {
			progressFlag.set(true);
		}

		boolean resetProgressFlag() {
			return progressFlag.getAndSet(false);
		}

		public void setStatus(String status) {
			taskProgress.setStatus(status);
			// indicate that progress update needs to be sent
			setProgressFlag();
		}

		public void setProgress(float progress) {
			taskProgress.set(progress);
			// indicate that progress update needs to be sent
			setProgressFlag();
		}

		public void progress() {
			// indicate that progress update needs to be sent
			setProgressFlag();
		}

		public Counters.Counter getCounter(String group, String name) {
			Counters.Counter counter = null;
			if (counters != null) {
				counter = counters.findCounter(group, name);
			}
			return counter;
		}

		public Counters.Counter getCounter(Enum<?> name) {
			return counters == null ? null : counters.findCounter(name);
		}

		public void incrCounter(Enum key, long amount) {
			if (counters != null) {
				counters.incrCounter(key, amount);
			}
			setProgressFlag();
		}

		public void incrCounter(String group, String counter, long amount) {
			if (counters != null) {
				counters.incrCounter(group, counter, amount);
			}
			if (skipping
					&& SkipBadRecords.COUNTER_GROUP.equals(group)
					&& (SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS
							.equals(counter) || SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS
							.equals(counter))) {
				// if application reports the processed records, move the
				// currentRecStartIndex to the next.
				// currentRecStartIndex is the start index which has not yet
				// been
				// finished and is still in task's stomach.
				for (int i = 0; i < amount; i++) {
					currentRecStartIndex = currentRecIndexIterator.next();
				}
			}
			setProgressFlag();
		}

		public void setInputSplit(InputSplit split) {
			this.split = split;
		}

		public InputSplit getInputSplit() throws UnsupportedOperationException {
			if (split == null) {
				throw new UnsupportedOperationException(
						"Input only available on map");
			} else {
				return split;
			}
		}

		/**
		 * The communication thread handles communication with the parent (Task
		 * Tracker). It sends progress updates if progress has been made or if
		 * the task needs to let the parent know that it's alive. It also pings
		 * the parent to see if it's alive.
		 */
		public void run() {
			final int MAX_RETRIES = 3;
			int remainingRetries = MAX_RETRIES;
			// get current flag value and reset it as well
			boolean sendProgress = resetProgressFlag();
			while (!taskDone.get()) {
				try {
					boolean taskFound = true; // whether TT knows about this
					// task
					// sleep for a bit
					try {
						Thread.sleep(PROGRESS_INTERVAL);
					} catch (InterruptedException e) {
						LOG.debug(getTaskID()
								+ " Progress/ping thread exiting "
								+ "since it got interrupted");
						break;
					}

					if (sendProgress) {
						// we need to send progress update
						updateCounters();
						taskStatus.statusUpdate(taskProgress.get(),
								taskProgress.toString(), counters);
						taskFound = umbilical.statusUpdate(taskId, taskStatus);
						taskStatus.clearStatus();
					} else {
						// send ping
						taskFound = umbilical.ping(taskId);
					}

					// if Task Tracker is not aware of our task ID (probably
					// because it died and
					// came back up), kill ourselves
					if (!taskFound) {
						LOG.warn("Parent died.  Exiting " + taskId);
						System.exit(66);
					}

					sendProgress = resetProgressFlag();
					remainingRetries = MAX_RETRIES;
				} catch (Throwable t) {
					LOG.info("Communication exception: "
							+ StringUtils.stringifyException(t));
					remainingRetries -= 1;
					if (remainingRetries == 0) {
						ReflectionUtils.logThreadInfo(LOG,
								"Communication exception", 0);
						LOG.warn("Last retry, killing " + taskId);
						System.exit(65);
					}
				}
			}
		}

		public void startCommunicationThread() {
			if (pingThread == null) {
				pingThread = new Thread(this, "communication thread");
				pingThread.setDaemon(true);
				pingThread.start();
			}
		}

		public void stopCommunicationThread() throws InterruptedException {
			if (pingThread != null) {
				pingThread.interrupt();
				pingThread.join();
			}
		}
	}

	/**
	 * Reports the next executing record range to TaskTracker.
	 * 
	 * @param umbilical
	 * @param nextRecIndex
	 *            the record index which would be fed next.
	 * @throws IOException
	 */
	protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical,
			long nextRecIndex) throws IOException {
		// currentRecStartIndex is the start index which has not yet been
		// finished
		// and is still in task's stomach.
		long len = nextRecIndex - currentRecStartIndex + 1;
		SortedRanges.Range range = new SortedRanges.Range(currentRecStartIndex,
				len);
		taskStatus.setNextRecordRange(range);
		LOG.debug("sending reportNextRecordRange " + range);
		umbilical.reportNextRecordRange(taskId, range);
	}

	/**
	 * An updater that tracks the last number reported for a given file system
	 * and only creates the counters when they are needed.
	 */
	class FileSystemStatisticUpdater {
		private long prevReadBytes = 0;
		private long prevWriteBytes = 0;
		private FileSystem.Statistics stats;
		private Counters.Counter readCounter = null;
		private Counters.Counter writeCounter = null;
		private String[] counterNames;

		FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
			this.stats = stats;
			this.counterNames = getFileSystemCounterNames(uriScheme);
		}

		void updateCounters() {
			long newReadBytes = stats.getBytesRead();
			long newWriteBytes = stats.getBytesWritten();
			if (prevReadBytes != newReadBytes) {
				if (readCounter == null) {
					readCounter = counters.findCounter(
							FILESYSTEM_COUNTER_GROUP, counterNames[0]);
				}
				readCounter.increment(newReadBytes - prevReadBytes);
				prevReadBytes = newReadBytes;
			}
			if (prevWriteBytes != newWriteBytes) {
				if (writeCounter == null) {
					writeCounter = counters.findCounter(
							FILESYSTEM_COUNTER_GROUP, counterNames[1]);
				}
				writeCounter.increment(newWriteBytes - prevWriteBytes);
				prevWriteBytes = newWriteBytes;
			}
		}
	}

	/**
	 * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
	 */
	private Map<String, FileSystemStatisticUpdater> statisticUpdaters = new HashMap<String, FileSystemStatisticUpdater>();

	private synchronized void updateCounters() {
		for (Statistics stat : FileSystem.getAllStatistics()) {
			String uriScheme = stat.getScheme();
			FileSystemStatisticUpdater updater = statisticUpdaters
					.get(uriScheme);
			if (updater == null) {// new FileSystem has been found in the cache
				updater = new FileSystemStatisticUpdater(uriScheme, stat);
				statisticUpdaters.put(uriScheme, updater);
			}
			updater.updateCounters();
		}
	}

	public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter)
			throws IOException, InterruptedException {
		LOG.info("Task:" + taskId + " is done."
				+ " And is in the process of commiting");
		updateCounters();

		// check whether the commit is required.
		boolean commitRequired = committer.needsTaskCommit(taskContext);
		if (commitRequired) {
			int retries = MAX_RETRIES;
			setState(TaskStatus.State.COMMIT_PENDING);
			// say the task tracker that task is commit pending
			while (true) {
				try {
					umbilical.commitPending(taskId, taskStatus);
					break;
				} catch (InterruptedException ie) {
					// ignore
				} catch (IOException ie) {
					LOG.warn("Failure sending commit pending: "
							+ StringUtils.stringifyException(ie));
					if (--retries == 0) {
						System.exit(67);
					}
				}
			}
			// wait for commit approval and commit
			commit(umbilical, reporter, committer);
		}
		taskDone.set(true);
		reporter.stopCommunicationThread();
		sendLastUpdate(umbilical);
		// signal the tasktracker that we are done
		sendDone(umbilical);
	}

	protected void statusUpdate(TaskUmbilicalProtocol umbilical)
			throws IOException {
		int retries = MAX_RETRIES;
		while (true) {
			try {
				if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
					LOG.warn("Parent died.  Exiting " + taskId);
					System.exit(66);
				}
				taskStatus.clearStatus();
				return;
			} catch (InterruptedException ie) {
				Thread.currentThread().interrupt(); // interrupt ourself
			} catch (IOException ie) {
				LOG.warn("Failure sending status update: "
						+ StringUtils.stringifyException(ie));
				if (--retries == 0) {
					throw ie;
				}
			}
		}
	}

	private void sendLastUpdate(TaskUmbilicalProtocol umbilical)
			throws IOException {
		// send a final status report
		taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(),
				counters);
		statusUpdate(umbilical);
	}

	private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
		int retries = MAX_RETRIES;
		while (true) {
			try {
				umbilical.done(getTaskID());
				LOG.info("Task '" + taskId + "' done.");
				return;
			} catch (IOException ie) {
				LOG.warn("Failure signalling completion: "
						+ StringUtils.stringifyException(ie));
				if (--retries == 0) {
					throw ie;
				}
			}
		}
	}

	private void commit(TaskUmbilicalProtocol umbilical, TaskReporter reporter,
			org.apache.hadoop.mapreduce.OutputCommitter committer)
			throws IOException {
		int retries = MAX_RETRIES;
		while (true) {
			try {
				while (!umbilical.canCommit(taskId)) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException ie) {
						// ignore
					}
					reporter.setProgressFlag();
				}
				break;
			} catch (IOException ie) {
				LOG.warn("Failure asking whether task can commit: "
						+ StringUtils.stringifyException(ie));
				if (--retries == 0) {
					// if it couldn't query successfully then delete the output
					discardOutput(taskContext);
					System.exit(68);
				}
			}
		}

		// task can Commit now
		try {
			LOG.info("Task " + taskId + " is allowed to commit now");
			committer.commitTask(taskContext);
			return;
		} catch (IOException iee) {
			LOG.warn("Failure committing: "
					+ StringUtils.stringifyException(iee));
			// if it couldn't commit a successfully then delete the output
			discardOutput(taskContext);
			throw iee;
		}
	}

	private void discardOutput(TaskAttemptContext taskContext) {
		try {
			committer.abortTask(taskContext);
		} catch (IOException ioe) {
			LOG.warn("Failure cleaning up: "
					+ StringUtils.stringifyException(ioe));
		}
	}

	protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
			TaskReporter reporter) throws IOException, InterruptedException {
		taskCleanup(umbilical);
		done(umbilical, reporter);
	}

	void taskCleanup(TaskUmbilicalProtocol umbilical) throws IOException {
		// set phase for this task
		setPhase(TaskStatus.Phase.CLEANUP);
		getProgress().setStatus("cleanup");
		statusUpdate(umbilical);
		LOG.info("Runnning cleanup for the task");
		// do the cleanup
		committer.abortTask(taskContext);
	}

	protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
			TaskReporter reporter) throws IOException, InterruptedException {
		// set phase for this task
		setPhase(TaskStatus.Phase.CLEANUP);
		getProgress().setStatus("cleanup");
		statusUpdate(umbilical);
		// do the cleanup
		committer.cleanupJob(jobContext);
		done(umbilical, reporter);
	}

	protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
			TaskReporter reporter) throws IOException, InterruptedException {
		// do the setup
		getProgress().setStatus("setup");
		committer.setupJob(jobContext);
		done(umbilical, reporter);
	}

	public void setConf(Configuration conf) {
		if (conf instanceof JobConf) {
			this.conf = (JobConf) conf;
		} else {
			this.conf = new JobConf(conf);
		}
		this.reduceOutputFile.setConf(this.conf);
		this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
		// add the static resolutions (this is required for the junit to
		// work on testcases that simulate multiple nodes on a single physical
		// node.
		String hostToResolved[] = conf
				.getStrings("hadoop.net.static.resolutions");
		if (hostToResolved != null) {
			for (String str : hostToResolved) {
				String name = str.substring(0, str.indexOf('='));
				String resolvedName = str.substring(str.indexOf('=') + 1);
				NetUtils.addStaticResolution(name, resolvedName);
			}
		}

		/**
		 * haloop: get num of loop body steps
		 */
		numOfLoopBodySteps = this.conf.getNumberOfLoopBodySteps();
	}

	public Configuration getConf() {
		return this.conf;
	}

	/**
	 * OutputCollector for the combiner.
	 */
	protected static class CombineOutputCollector<K extends Object, V extends Object>
			implements OutputCollector<K, V> {
		private Writer<K, V> writer;
		private Counters.Counter outCounter;

		public CombineOutputCollector(Counters.Counter outCounter) {
			this.outCounter = outCounter;
		}

		public synchronized void setWriter(Writer<K, V> writer) {
			this.writer = writer;
		}

		public synchronized void collect(K key, V value) throws IOException {
			outCounter.increment(1);
			writer.append(key, value);
		}
	}

	protected static class CombineValuesIterator<KEY, VALUE> extends
			ValuesIterator<KEY, VALUE> {

		private final Counters.Counter combineInputCounter;

		public CombineValuesIterator(RawKeyValueIterator in,
				RawComparator<KEY> comparator, Class<KEY> keyClass,
				Class<VALUE> valClass, Configuration conf, Reporter reporter,
				Counters.Counter combineInputCounter) throws IOException {
			super(in, comparator, keyClass, valClass, conf, reporter);
			this.combineInputCounter = combineInputCounter;
		}

		public VALUE next() {
			combineInputCounter.increment(1);
			return super.next();
		}
	}

	private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> contextConstructor;
	static {
		try {
			contextConstructor = org.apache.hadoop.mapreduce.Reducer.Context.class
					.getConstructor(new Class[] {
							org.apache.hadoop.mapreduce.Reducer.class,
							Configuration.class,
							org.apache.hadoop.mapreduce.TaskAttemptID.class,
							RawKeyValueIterator.class,
							org.apache.hadoop.mapreduce.Counter.class,
							org.apache.hadoop.mapreduce.RecordWriter.class,
							org.apache.hadoop.mapreduce.OutputCommitter.class,
							org.apache.hadoop.mapreduce.StatusReporter.class,
							RawComparator.class, Class.class, Class.class });
		} catch (NoSuchMethodException nme) {
			throw new IllegalArgumentException("Can't find constructor");
		}
	}

	@SuppressWarnings("unchecked")
	protected static <INKEY, INVALUE, OUTKEY, OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context createReduceContext(
			org.apache.hadoop.mapreduce.Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer,
			Configuration job,
			org.apache.hadoop.mapreduce.TaskAttemptID taskId,
			RawKeyValueIterator rIter,
			org.apache.hadoop.mapreduce.Counter inputCounter,
			org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> output,
			org.apache.hadoop.mapreduce.OutputCommitter committer,
			org.apache.hadoop.mapreduce.StatusReporter reporter,
			RawComparator<INKEY> comparator, Class<INKEY> keyClass,
			Class<INVALUE> valueClass) throws IOException,
			ClassNotFoundException {
		try {

			return contextConstructor.newInstance(reducer, job, taskId, rIter,
					inputCounter, output, committer, reporter, comparator,
					keyClass, valueClass);
		} catch (InstantiationException e) {
			throw new IOException("Can't create Context", e);
		} catch (InvocationTargetException e) {
			throw new IOException("Can't invoke Context constructor", e);
		} catch (IllegalAccessException e) {
			throw new IOException("Can't invoke Context constructor", e);
		}
	}

	protected static abstract class CombinerRunner<K, V> {
		protected final Counters.Counter inputCounter;
		protected final JobConf job;
		protected final TaskReporter reporter;

		CombinerRunner(Counters.Counter inputCounter, JobConf job,
				TaskReporter reporter) {
			this.inputCounter = inputCounter;
			this.job = job;
			this.reporter = reporter;
		}

		/**
		 * Run the combiner over a set of inputs.
		 * 
		 * @param iterator
		 *            the key/value pairs to use as input
		 * @param collector
		 *            the output collector
		 */
		abstract void combine(RawKeyValueIterator iterator,
				OutputCollector<K, V> collector) throws IOException,
				InterruptedException, ClassNotFoundException;

		static <K, V> CombinerRunner<K, V> create(JobConf job,
				TaskAttemptID taskId, Counters.Counter inputCounter,
				TaskReporter reporter,
				org.apache.hadoop.mapreduce.OutputCommitter committer)
				throws ClassNotFoundException {
			Class<? extends Reducer<K, V, K, V>> cls = (Class<? extends Reducer<K, V, K, V>>) job
					.getCombinerClass();
			if (cls != null) {
				return new OldCombinerRunner(cls, job, inputCounter, reporter);
			}
			// make a task context so we can get the classes
			org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(
					job, taskId);
			Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K, V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K, V>>) taskContext
					.getCombinerClass();
			if (newcls != null) {
				return new NewCombinerRunner<K, V>(newcls, job, taskId,
						taskContext, inputCounter, reporter, committer);
			}

			return null;
		}
	}

	protected static class OldCombinerRunner<K, V> extends CombinerRunner<K, V> {
		private final Class<? extends Reducer<K, V, K, V>> combinerClass;
		private final Class<K> keyClass;
		private final Class<V> valueClass;
		private final RawComparator<K> comparator;

		protected OldCombinerRunner(Class<? extends Reducer<K, V, K, V>> cls,
				JobConf conf, Counters.Counter inputCounter,
				TaskReporter reporter) {
			super(inputCounter, conf, reporter);
			combinerClass = cls;
			keyClass = (Class<K>) job.getMapOutputKeyClass();
			valueClass = (Class<V>) job.getMapOutputValueClass();
			comparator = (RawComparator<K>) job.getOutputKeyComparator();
		}

		@SuppressWarnings("unchecked")
		protected void combine(RawKeyValueIterator kvIter,
				OutputCollector<K, V> combineCollector) throws IOException {
			Reducer<K, V, K, V> combiner = ReflectionUtils.newInstance(
					combinerClass, job);
			try {
				CombineValuesIterator<K, V> values = new CombineValuesIterator<K, V>(
						kvIter, comparator, keyClass, valueClass, job,
						Reporter.NULL, inputCounter);
				while (values.more()) {
					combiner.reduce(values.getKey(), values, combineCollector,
							Reporter.NULL);
					values.nextKey();
				}
			} finally {
				combiner.close();
			}
		}
	}

	protected static class NewCombinerRunner<K, V> extends CombinerRunner<K, V> {
		private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K, V>> reducerClass;
		private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
		private final RawComparator<K> comparator;
		private final Class<K> keyClass;
		private final Class<V> valueClass;
		private final org.apache.hadoop.mapreduce.OutputCommitter committer;

		NewCombinerRunner(Class reducerClass, JobConf job,
				org.apache.hadoop.mapreduce.TaskAttemptID taskId,
				org.apache.hadoop.mapreduce.TaskAttemptContext context,
				Counters.Counter inputCounter, TaskReporter reporter,
				org.apache.hadoop.mapreduce.OutputCommitter committer) {
			super(inputCounter, job, reporter);
			this.reducerClass = reducerClass;
			this.taskId = taskId;
			keyClass = (Class<K>) context.getMapOutputKeyClass();
			valueClass = (Class<V>) context.getMapOutputValueClass();
			comparator = (RawComparator<K>) context.getSortComparator();
			this.committer = committer;
		}

		private static class OutputConverter<K, V> extends
				org.apache.hadoop.mapreduce.RecordWriter<K, V> {
			OutputCollector<K, V> output;

			OutputConverter(OutputCollector<K, V> output) {
				this.output = output;
			}

			@Override
			public void close(
					org.apache.hadoop.mapreduce.TaskAttemptContext context) {
			}

			@Override
			public void write(K key, V value) throws IOException,
					InterruptedException {
				output.collect(key, value);
			}
		}

		@Override
		void combine(RawKeyValueIterator iterator,
				OutputCollector<K, V> collector) throws IOException,
				InterruptedException, ClassNotFoundException {
			// make a reducer
			org.apache.hadoop.mapreduce.Reducer<K, V, K, V> reducer = (org.apache.hadoop.mapreduce.Reducer<K, V, K, V>) ReflectionUtils
					.newInstance(reducerClass, job);
			org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(
					reducer, job, taskId, iterator, inputCounter,
					new OutputConverter(collector), committer, reporter,
					comparator, keyClass, valueClass);
			reducer.run(reducerContext);
		}

	}

}
